Skip to content

Conversation

@kixelated
Copy link
Collaborator

Summary

  • Adds TrackProducer::is_closed() to check if a track has been closed/aborted
  • BroadcastConsumer::subscribe_track() now checks liveness before returning a cached producer, and removes stale entries so a fresh request is issued
  • Adds a stale_producer test verifying late-joining subscribers get a new producer after publisher disconnect

Fixes #941

Test plan

  • cargo test -p moq-lite — all 142 tests pass including new stale_producer test
  • just check — full CI passes

🤖 Generated with Claude Code

When a publisher disconnects, the cached TrackProducer in
BroadcastConsumer::subscribe_track() was returned to late-joining
subscribers even though it was already closed. Now we check
is_closed() before returning a cached producer, and remove stale
entries so a fresh request is issued.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 12, 2026

Walkthrough

This change modifies the broadcast consumer's track subscription logic to handle stale closed producers. When a matching producer exists but is closed, the system now removes it and creates a new one instead of returning the closed producer. To support this detection, a new public method is_closed() was added to TrackProducer that checks if the internal closed state is set. A test case verifies the behavior by simulating a publisher disconnect and confirming that subsequent subscriptions receive a fresh producer with a new request.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Fix stale TrackProducer returned from cache' directly describes the main change: handling stale cached producers instead of returning them unconditionally.
Description check ✅ Passed The description clearly outlines the changes: adding is_closed(), checking liveness in subscribe_track(), removing stale entries, and includes test verification details.
Linked Issues check ✅ Passed The PR fully implements the suggested fix from issue #941: adds TrackProducer::is_closed(), checks liveness before returning cached producers, removes stale entries, and includes the stale_producer test.
Out of Scope Changes check ✅ Passed All changes are directly related to fixing issue #941: the is_closed() method, the subscribe_track() liveness check, stale entry removal, and the stale_producer test are all within scope.
Docstring Coverage ✅ Passed Docstring coverage is 85.71% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/stale-track-producer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 195-201: The removal must be guarded by identity so the cleanup
task only evicts the exact producer instance it created: when the cleanup task
(spawned after producer insertion and awaiting producer.unused()) wants to
remove, lock state and check state.producers.get(&producer.info.name) — only
call state.producers.remove(&producer.info.name) if the stored value is the same
instance (e.g. Arc::ptr_eq(stored, &producer) or compare a unique producer id
field); apply the same identity check before removing the "stale" entry in the
early-return path where you see state.producers.remove(&track.name) to avoid
removing a newly inserted producer with the same name.
🧹 Nitpick comments (1)
rs/moq-lite/src/model/broadcast.rs (1)

402-429: Good test coverage for the core fix.

The test correctly verifies that a closed producer is not returned from cache. Consider adding a follow-up assertion that drops track1 (the old consumer), yields to the runtime (tokio::task::yield_now().await), and then verifies track2 still works and a third subscribe_track still deduplicates with track2 — this would catch the cleanup-task race described above.

Comment on lines 195 to 201
if let Some(producer) = state.producers.get(&track.name) {
return producer.consume();
if !producer.is_closed() {
return producer.consume();
}
// Remove the stale entry
state.producers.remove(&track.name);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, read the relevant section with context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '180,240p'

Repository: moq-dev/moq

Length of output: 2265


🏁 Script executed:

# Check if is_clone() method exists on the producer type
rg -n "is_clone|fn is_clone" rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 291


🏁 Script executed:

# Check the producer type definition and its methods
rg -n "impl.*Producer|fn unused" rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 317


🏁 Script executed:

# Look for the full file structure to understand the type
git ls-files rs/moq-lite/src/model/ | head -20

Repository: moq-dev/moq

Length of output: 273


🏁 Script executed:

# Check if there are other places where producers.remove is used that might have same issue
rg -n "producers\.remove" rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 243


🏁 Script executed:

# Examine the actual unused() implementation to confirm it waits for consumers
sed -n '112,125p' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 407


🏁 Script executed:

# Check the test coverage for this scenario
sed -n '280,310p' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 831


🏁 Script executed:

# Check if there are existing tests for concurrent subscribe_track scenarios
rg -n "#\[tokio::test\]|#\[test\]" rs/moq-lite/src/model/broadcast.rs | head -20

Repository: moq-dev/moq

Length of output: 194


🏁 Script executed:

# Verify the fix is syntactically correct by checking usage of is_clone elsewhere
rg -B3 -A3 "is_clone" rs/moq-lite/src/model/broadcast.rs | head -40

Repository: moq-dev/moq

Length of output: 1139


Stale cleanup task can remove the new producer from the map due to async race condition.

When a producer is first inserted, a cleanup task is spawned (lines 224–227) that removes the entry by track name once unused() resolves (all consumers dropped). If the same track is subscribed again while the old producer's cleanup task is still pending, the code removes the stale entry (line 200) and inserts a fresh producer at the same key (line 220). The old cleanup task will eventually call state.lock().producers.remove(&producer.info.name), which removes the new producer from the map by name, breaking deduplication for subsequent subscribers.

Guard the removal with an identity check to ensure only the original producer instance is evicted:

Proposed fix
		let state = self.state.clone();
+		let producer_ref = producer.clone();
 		web_async::spawn(async move {
-			producer.unused().await;
-			state.lock().producers.remove(&producer.info.name);
+			producer_ref.unused().await;
+			let mut s = state.lock();
+			if let Some(existing) = s.producers.get(&producer_ref.info.name) {
+				if existing.is_clone(&producer_ref) {
+					s.producers.remove(&producer_ref.info.name);
+				}
+			}
 		});
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/model/broadcast.rs` around lines 195 - 201, The removal must
be guarded by identity so the cleanup task only evicts the exact producer
instance it created: when the cleanup task (spawned after producer insertion and
awaiting producer.unused()) wants to remove, lock state and check
state.producers.get(&producer.info.name) — only call
state.producers.remove(&producer.info.name) if the stored value is the same
instance (e.g. Arc::ptr_eq(stored, &producer) or compare a unique producer id
field); apply the same identity check before removing the "stale" entry in the
early-return path where you see state.producers.remove(&track.name) to avoid
removing a newly inserted producer with the same name.

@kixelated kixelated enabled auto-merge (squash) February 12, 2026 15:30
@kixelated kixelated merged commit 31dd649 into main Feb 12, 2026
1 check passed
@kixelated kixelated deleted the fix/stale-track-producer branch February 12, 2026 15:31
@moq-bot moq-bot bot mentioned this pull request Feb 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Stale TrackProducer returned from BroadcastConsumer::subscribe_track() cache

1 participant